package com.atguigu.flink.chapter11;

import com.atguigu.flink.bean.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @Author lzc
 * @Date 2022/10/31 10:41
 */
public class Flink02_TableApi_BaseUse_Agg {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 2000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);
    
        DataStreamSource<WaterSensor> stream = env.fromElements(
            new WaterSensor("s1", 1L, 10),
            new WaterSensor("s2", 1L, 10),
            new WaterSensor("s1", 2L, 20),
            new WaterSensor("s1", 3L, 30),
            new WaterSensor("s1", 4L, 40),
            new WaterSensor("s1", 5L, 50)
        );
        
        // 1. 创建一个表的执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        
        // 2. 使用表的执行环境把一个流转成表(动态表)
        // 会自动把 pojo 中的属性名作为表的字段名
        Table table = tEnv.fromDataStream(stream);
        // select id, sum(vc) as sum_vc from t group by id
        Table result = table
            .groupBy($("id"))
            .aggregate($("vc").sum().as("sum_vc"))
            .select($("id"), $("sum_vc"));
    
       /* DataStream<Tuple2<Boolean, Row>> resultStream = tEnv
            .toRetractStream(result, Row.class);*/
        tEnv
            .toRetractStream(result, Row.class)
            .filter(t -> t.f0)
            .map(t -> t.f1)
            .print();
          
    
    
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
/*
TableApi

SQL

 */